package com.sailing.lianxi.service;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import oracle.net.aso.n;

public class ProcessDataService {

	public void process(ConsumerRecord<String, String> record) {
		MessageDao messageDao = new MessageDao();
		try {
			 //对消息进行处理，这里只是简单的打印了一下
	         System.out.println(">>>>>>>>>"+Thread.currentThread().getName()+"_"+record);
		   //更改偏移量
			messageDao.update(record.topic(), record.partition(), record.offset());
//		   throw new RuntimeException("error");//抛出异常
		} catch (Exception e) {
			// TODO: handle exception
		}finally {
			
		}
	}
}
